# frozen_string_literal: true

require "grpc"
require "fileutils"
require "socket"
require "yaml"
require "shellwords"
require "base64"
require_relative "errors"
require_relative "kubernetes_client"
require_relative "../csi_services_pb"

module Csi
  module V1
    class NodeService < Node::Service
      include Csi::ServiceHelper

      MAX_VOLUMES_PER_NODE = 8
      VOLUME_BASE_PATH = "/var/lib/ubicsi"
      OLD_PV_NAME_ANNOTATION_KEY = "csi.ubicloud.com/old-pv-name"
      OLD_PVC_OBJECT_ANNOTATION_KEY = "csi.ubicloud.com/old-pvc-object"
      ACCEPTABLE_FS = ["ext4", "xfs"].freeze

      def self.mkdir_p
        FileUtils.mkdir_p(VOLUME_BASE_PATH)
      end

      def initialize(logger:, node_id:)
        @logger = logger
        @node_id = node_id
      end

      attr_reader :node_id

      def node_get_capabilities(req, _call)
        log_request_response(req, "node_get_capabilities") do |req_id|
          NodeGetCapabilitiesResponse.new(
            capabilities: [
              NodeServiceCapability.new(
                rpc: NodeServiceCapability::RPC.new(
                  type: NodeServiceCapability::RPC::Type::STAGE_UNSTAGE_VOLUME
                )
              )
            ]
          )
        end
      end

      def node_get_info(req, _call)
        log_request_response(req, "node_get_info") do |req_id|
          topology = Topology.new(
            segments: {
              "kubernetes.io/hostname" => @node_id
            }
          )
          NodeGetInfoResponse.new(
            node_id: @node_id,
            max_volumes_per_node: MAX_VOLUMES_PER_NODE,
            accessible_topology: topology
          )
        end
      end

      def run_cmd_output(*cmd, req_id:)
        output, _ = run_cmd(*cmd, req_id:)
        output
      end

      def is_mounted?(path, req_id:)
        _, status = run_cmd("mountpoint", "-q", path, req_id:)
        status == 0
      end

      def find_loop_device(backing_file, req_id:)
        output, ok = run_cmd("losetup", "-j", backing_file, req_id:)
        if ok && !output.empty?
          loop_device = output.split(":", 2)[0].strip
          return loop_device if loop_device.start_with?("/dev/loop")
        end
        nil
      end

      def remove_loop_device(backing_file, req_id:)
        loop_device = find_loop_device(backing_file, req_id:)
        return unless loop_device

        output, ok = run_cmd("losetup", "-d", loop_device, req_id:)
        unless ok
          raise "Could not remove loop device: #{output}"
        end
      end

      def self.backing_file_path(volume_id)
        File.join(VOLUME_BASE_PATH, "#{volume_id}.img")
      end

      def pvc_needs_migration?(pvc)
        old_pv_name = pvc.dig("metadata", "annotations", OLD_PV_NAME_ANNOTATION_KEY)
        !old_pv_name.nil?
      end

      def find_file_system(loop_device, req_id:)
        output, ok = run_cmd("blkid", "-o", "value", "-s", "TYPE", loop_device, req_id:)
        unless ok
          raise "Failed to get the loop device filesystem status: #{output}"
        end

        output.strip
      end

      def node_stage_volume(req, _call)
        log_request_response(req, "node_stage_volume") do |req_id|
          client = KubernetesClient.new(req_id:, logger: @logger)
          pvc = fetch_and_migrate_pvc(req_id, client, req)
          perform_node_stage_volume(req_id, pvc, req, _call)
          roll_back_reclaim_policy(req_id, client, req, pvc)
          remove_old_pv_annotation_from_pvc(req_id, client, pvc)
          NodeStageVolumeResponse.new
        rescue => e
          log_and_raise(req_id, e)
        end
      end

      def fetch_and_migrate_pvc(req_id, client, req)
        pvc = client.get_pvc(req.volume_context["csi.storage.k8s.io/pvc/namespace"],
          req.volume_context["csi.storage.k8s.io/pvc/name"])
        if pvc_needs_migration?(pvc)
          migrate_pvc_data(req_id, client, pvc, req)
        end
        pvc
      end

      def perform_node_stage_volume(req_id, pvc, req, _call)
        volume_id = req.volume_id
        staging_path = req.staging_target_path
        size_bytes = Integer(req.volume_context["size_bytes"], 10)
        backing_file = NodeService.backing_file_path(volume_id)

        unless File.exist?(backing_file)
          output, ok = run_cmd("fallocate", "-l", size_bytes.to_s, backing_file, req_id:)
          unless ok
            raise GRPC::ResourceExhausted.new("Failed to allocate backing file: #{output}")
          end

          output, ok = run_cmd("fallocate", "--punch-hole", "--keep-size", "-o", "0", "-l", size_bytes.to_s, backing_file, req_id:)
          unless ok
            raise GRPC::ResourceExhausted.new("Failed to punch hole in backing file: #{output}")
          end
        end

        loop_device = find_loop_device(backing_file, req_id:)
        is_new_loop_device = loop_device.nil?
        if is_new_loop_device
          log_with_id(req_id, "Setting up new loop device for: #{backing_file}")
          output, ok = run_cmd("losetup", "--find", "--show", backing_file, req_id:)
          loop_device = output.strip
          unless ok && !loop_device.empty?
            raise "Failed to setup loop device: #{output}"
          end
        else
          log_with_id(req_id, "Loop device already exists: #{loop_device}")
        end

        if req.volume_capability.mount
          current_fs_type = find_file_system(loop_device, req_id:)
          if !current_fs_type.empty? && !ACCEPTABLE_FS.include?(current_fs_type)
            raise "Unacceptable file system type for #{loop_device}: #{current_fs_type}"
          end

          desired_fs_type = req.volume_capability.mount.fs_type || "ext4"
          if current_fs_type != "" && current_fs_type != desired_fs_type
            raise "Unexpected filesystem on volume. desired: #{desired_fs_type}, current: #{current_fs_type}"
          elsif current_fs_type == ""
            output, ok = run_cmd("mkfs.#{desired_fs_type}", loop_device, req_id:)
            unless ok
              raise "Failed to format device #{loop_device} with #{desired_fs_type}: #{output}"
            end
          end
        end
        unless is_mounted?(staging_path, req_id:)
          FileUtils.mkdir_p(staging_path)
          output, ok = run_cmd("mount", loop_device, staging_path, req_id:)
          unless ok
            raise "Failed to mount #{loop_device} to #{staging_path}: #{output}"
          end
        end
        # If block, do nothing else
      end

      def remove_old_pv_annotation_from_pvc(req_id, client, pvc)
        namespace, name = pvc["metadata"].values_at("namespace", "name")
        log_with_id(req_id, "Removing old pv annotation #{OLD_PV_NAME_ANNOTATION_KEY}")
        client.remove_pvc_annotation(namespace, name, OLD_PV_NAME_ANNOTATION_KEY)
      end

      def roll_back_reclaim_policy(req_id, client, req, pvc)
        old_pv_name = pvc.dig("metadata", "annotations", OLD_PV_NAME_ANNOTATION_KEY)
        if old_pv_name.nil?
          return
        end

        pv = client.get_pv(old_pv_name)
        if pv.dig("spec", "persistentVolumeReclaimPolicy") == "Retain"
          pv["spec"]["persistentVolumeReclaimPolicy"] = "Delete"
          client.update_pv(pv)
        end
      end

      def migrate_pvc_data(req_id, client, pvc, req)
        old_pv_name = pvc.dig("metadata", "annotations", OLD_PV_NAME_ANNOTATION_KEY)
        pv = client.get_pv(old_pv_name)
        pv_node = client.extract_node_from_pv(pv)
        old_node_ip = client.get_node_ip(pv_node)
        old_data_path = NodeService.backing_file_path(pv["spec"]["csi"]["volumeHandle"])
        current_data_path = NodeService.backing_file_path(req.volume_id)

        daemonizer_unit_name = Shellwords.shellescape("copy_#{old_pv_name}")
        case run_cmd_output("nsenter", "-t", "1", "-a", "/home/ubi/common/bin/daemonizer2", "check", daemonizer_unit_name, req_id:)
        when "Succeeded"
          run_cmd_output("nsenter", "-t", "1", "-a", "/home/ubi/common/bin/daemonizer2", "clean", daemonizer_unit_name, req_id:)
        when "NotStarted"
          copy_command = ["rsync", "-az", "--inplace", "--compress-level=9", "--partial", "--whole-file", "-e", "ssh -T -c aes128-gcm@openssh.com -o Compression=no -x -o StrictHostKeyChecking=no -o UserKnownHostsFile=/dev/null -i /home/ubi/.ssh/id_ed25519", "ubi@#{old_node_ip}:#{old_data_path}", current_data_path]
          run_cmd_output("nsenter", "-t", "1", "-a", "/home/ubi/common/bin/daemonizer2", "run", daemonizer_unit_name, *copy_command, req_id:)
          raise CopyNotFinishedError, "Old PV data is not copied yet"
        when "InProgress"
          raise CopyNotFinishedError, "Old PV data is not copied yet"
        when "Failed"
          raise "Copy old PV data failed"
        else
          raise "Daemonizer2 returned unknown status"
        end
      end

      def node_unstage_volume(req, _call)
        log_request_response(req, "node_unstage_volume") do |req_id|
          backing_file = NodeService.backing_file_path(req.volume_id)
          client = KubernetesClient.new(req_id:, logger: @logger)
          if !client.node_schedulable?(@node_id)
            prepare_data_migration(client, req_id, req.volume_id)
          end
          remove_loop_device(backing_file, req_id:)
          staging_path = req.staging_target_path
          if is_mounted?(staging_path, req_id:)
            output, ok = run_cmd("umount", "-q", staging_path, req_id:)
            unless ok
              raise "Failed to unmount #{staging_path}: #{output}"
            end
          end
          NodeUnstageVolumeResponse.new
        rescue => e
          log_and_raise(req_id, e)
        end
      end

      def prepare_data_migration(client, req_id, volume_id)
        log_with_id(req_id, "Retaining pv with volume_id #{volume_id}")
        pv = retain_pv(req_id, client, volume_id)
        log_with_id(req_id, "Recreating pvc with volume_id #{volume_id}")
        recreate_pvc(req_id, client, pv)
      end

      def retain_pv(req_id, client, volume_id)
        pv = client.find_pv_by_volume_id(volume_id)
        log_with_id(req_id, "Found PV with volume_id #{volume_id}: #{pv}")
        if pv.dig("spec", "persistentVolumeReclaimPolicy") != "Retain"
          pv["spec"]["persistentVolumeReclaimPolicy"] = "Retain"
          client.update_pv(pv)
          log_with_id(req_id, "Updated PV to retain")
        end
        pv
      end

      def recreate_pvc(req_id, client, pv)
        pvc_namespace, pvc_name = pv["spec"]["claimRef"].values_at("namespace", "name")
        pv_name = pv.dig("metadata", "name")
        begin
          pvc = client.get_pvc(pvc_namespace, pvc_name)
        rescue ObjectNotFoundError
          old_pvc_object = pv.dig("metadata", "annotations", OLD_PVC_OBJECT_ANNOTATION_KEY)
          if old_pvc_object.empty?
            raise
          end

          pvc = YAML.load(Base64.decode64(old_pvc_object))
        end
        log_with_id(req_id, "Found matching PVC for PV #{pv_name}: #{pvc}")
        pvc_uid = pvc.dig("metadata", "uid")
        pvc_deletion_timestamp = pvc.dig("metadata", "deletionTimestamp")
        trim_pvc(pvc, pv_name)
        log_with_id(req_id, "Trimmed PVC for recreation: #{pvc}")

        client.patch_resource("pv", pv_name, OLD_PVC_OBJECT_ANNOTATION_KEY, Base64.strict_encode64(YAML.dump(pvc)))

        if pvc_uid == pv_name.delete_prefix("pvc-")
          if !pvc_deletion_timestamp
            client.delete_pvc(pvc_namespace, pvc_name)
            log_with_id(req_id, "Deleted PVC #{pvc_namespace}/#{pvc_name}")
            client.remove_pvc_finalizers(pvc_namespace, pvc_name)
            log_with_id(req_id, "Removed PVC finalizers #{pvc_namespace}/#{pvc_name}")
          end
          client.create_pvc(pvc)
          log_with_id(req_id, "Recreated PVC with the new spec")
        else
          # PVC is recreated now.
          # At this stage we don't know whether we have created the PVC or
          # Statefulset controller has created it. We just need to make sure
          # the csi.ubicloud.com/old-pv-name annotation is set.
          client.patch_resource("pvc", pvc_name, OLD_PV_NAME_ANNOTATION_KEY, pv_name, namespace: pvc_namespace)
        end
      end

      def trim_pvc(pvc, pv_name)
        pvc["metadata"]["annotations"] ||= {}
        %W[#{OLD_PVC_OBJECT_ANNOTATION_KEY} volume.kubernetes.io/selected-node pv.kubernetes.io/bind-completed].each do |key|
          pvc["metadata"]["annotations"].delete(key)
        end
        %w[resourceVersion uid creationTimestamp deletionTimestamp deletionGracePeriodSeconds].each do |key|
          pvc["metadata"].delete(key)
        end
        pvc["spec"].delete("volumeName")
        pvc.delete("status")
        pvc["metadata"]["annotations"][OLD_PV_NAME_ANNOTATION_KEY] = pv_name
        pvc
      end

      def node_publish_volume(req, _call)
        log_request_response(req, "node_publish_volume") do |req_id|
          staging_path = req.staging_target_path
          target_path = req.target_path

          unless is_mounted?(target_path, req_id:)
            FileUtils.mkdir_p(target_path)
            output, ok = run_cmd("mount", "--bind", staging_path, target_path, req_id:)
            unless ok
              raise "Failed to bind mount #{staging_path} to #{target_path}: #{output}"
            end
          end

          NodePublishVolumeResponse.new
        rescue => e
          log_and_raise(req_id, e)
        end
      end

      def node_unpublish_volume(req, _call)
        log_request_response(req, "node_unpublish_volume") do |req_id|
          target_path = req.target_path

          if is_mounted?(target_path, req_id:)
            output, ok = run_cmd("umount", "-q", target_path, req_id:)
            unless ok
              raise "Failed to unmount #{target_path}: #{output}"
            end
          else
            log_with_id(req_id, "#{target_path} is not mounted, skipping umount")
          end

          NodeUnpublishVolumeResponse.new
        rescue => e
          log_and_raise(req_id, e)
        end
      end
    end
  end
end
